-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add collect list to dask-cudf groupby aggregations #8045
Add collect list to dask-cudf groupby aggregations #8045
Conversation
To illustrate the problem: >>> data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""
>>> df = pd.read_csv(StringIO(data))
>>> ddf = dd.from_pandas(df, 2)
>>> gdf = cudf.from_pandas(df)
>>> gddf = dask_cudf.from_cudf(gdf, 2)
>>> print(ddf.groupby("a").agg({"b":list}).compute())
b
a
1595802 [1611:0.92, 1610:0.07]
1524246 [1807:0.92, 1608:0.07]
>>> print(gddf.groupby("a").agg({"b":list}).compute())
b
a
1524246 [[1807:0.92, 1608:0.07]]
1595802 [[1611:0.92, 1610:0.07]] |
Codecov Report
@@ Coverage Diff @@
## branch-21.08 #8045 +/- ##
===============================================
Coverage ? 10.64%
===============================================
Files ? 109
Lines ? 18653
Branches ? 0
===============================================
Hits ? 1985
Misses ? 16668
Partials ? 0 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great @charlesbluca - Thanks!
My only concern is that you are explicity setting the index names to None
in the test. Is the "collection" result somehow different from other aggregations?
list: "collect", | ||
"list": "collect", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach.
@@ -478,6 +513,9 @@ def _finalize_gb_agg( | |||
gb.drop(columns=[sum_name], inplace=True) | |||
if "count" not in agg_list: | |||
gb.drop(columns=[count_name], inplace=True) | |||
if "collect" in agg_list: | |||
collect_name = _make_name(col, "collect", sep=sep) | |||
gb[collect_name] = gb[collect_name].list.concat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth the wait - Thanks for this concat
method @shwina :)
@pytest.mark.parametrize( | ||
"func", | ||
[ | ||
lambda df: df.groupby("x").agg({"y": "collect"}), | ||
pytest.param( | ||
lambda df: df.groupby("x").y.agg("collect"), marks=pytest.mark.skip | ||
), | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to define func
this way? Am I misunderstanding, or will the second of two cases always skipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This param skip, and the index nulling, I lifted from another dask-cudf groupby test:
cudf/python/dask_cudf/dask_cudf/tests/test_groupby.py
Lines 47 to 77 in 5b8895d
@pytest.mark.parametrize( | |
"func", | |
[ | |
lambda df: df.groupby("x").agg({"y": "max"}), | |
pytest.param( | |
lambda df: df.groupby("x").y.agg(["sum", "max"]), | |
marks=pytest.mark.skip, | |
), | |
], | |
) | |
def test_groupby_agg(func): | |
pdf = pd.DataFrame( | |
{ | |
"x": np.random.randint(0, 5, size=10000), | |
"y": np.random.normal(size=10000), | |
} | |
) | |
gdf = cudf.DataFrame.from_pandas(pdf) | |
ddf = dask_cudf.from_cudf(gdf, npartitions=5) | |
a = func(gdf).to_pandas() | |
b = func(ddf).compute().to_pandas() | |
a.index.name = None | |
a.name = None | |
b.index.name = None | |
b.name = None | |
dd.assert_eq(a, b) |
I can see what happens when we don't set the index to None
here, but when that test isn't skipped we get an AssertionError
on the types:
__________________________________________ test_groupby_collect[<lambda>1] ___________________________________________
func = <function <lambda> at 0x7f3c804a0d30>
@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "collect"}),
lambda df: df.groupby("x").y.agg("collect"),
],
)
def test_groupby_collect(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)
gdf = cudf.DataFrame.from_pandas(pdf)
ddf = dask_cudf.from_cudf(gdf, npartitions=5)
a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()
a.index.name = None
a.name = None
b.index.name = None
b.name = None
> dd.assert_eq(a, b)
dask_cudf/tests/test_groupby.py:155:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../compose/etc/conda/cuda_11.2.72/envs/rapids/lib/python3.8/site-packages/dask/dataframe/utils.py:559: in assert_eq
tm.assert_series_equal(a, b, check_names=check_names, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
left = 0 [-0.02279966962796973, -0.2268040371246616, 0....
1 [1.0547561143327269, 0.07632651478542447, -0.0...
2 [1.... [0.35305396010499146, 2.022936601816015, -0.02...
4 [0.7639835327097312, 0.9458744987601149, 0.370...
dtype: object
right = y
0 [-0.02279966962796973, -0.2268040371246616, 0....
1 [1.054756...796, 1.498...
3 [0.35305396010499146, 2.022936601816015, -0.02...
4 [0.7639835327097312, 0.9458744987601149, 0.370...
cls = <class 'pandas.core.series.Series'>
def _check_isinstance(left, right, cls):
"""
Helper method for our assert_* methods that ensures that
the two objects being compared have the right type before
proceeding with the comparison.
Parameters
----------
left : The first object being compared.
right : The second object being compared.
cls : The class type to check against.
Raises
------
AssertionError : Either `left` or `right` is not an instance of `cls`.
"""
cls_name = cls.__name__
if not isinstance(left, cls):
raise AssertionError(
f"{cls_name} Expected type {cls}, found {type(left)} instead"
)
if not isinstance(right, cls):
> raise AssertionError(
f"{cls_name} Expected type {cls}, found {type(right)} instead"
)
E AssertionError: Series Expected type <class 'pandas.core.series.Series'>, found <class 'pandas.core.frame.DataFrame'> instead
It looks like we are creating a dataframe here when we should be making a series.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are creating a dataframe here when we should be making a series.
Ah - It seems like this was already a problem before this PR. In that case, it is probably okay to fix that in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, do you know if there is an open issue for this problem? If not, I can open one so we can keep track of the follow up fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @charlesbluca - Note that I filed #8655 to make sure we address the Series/DataFrame inconsistency discussed here.
Thanks for opening the issue @rjzamora! |
@gpucibot merge |
Closes #7812
Adds support for cuDF's
collect
aggregation in dask-cuDF.